-
Notifications
You must be signed in to change notification settings - Fork 175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(connect): sql #3696
feat(connect): sql #3696
Conversation
CodSpeed Performance ReportMerging #3696 will degrade performances by 16.83%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3696 +/- ##
==========================================
- Coverage 77.87% 77.06% -0.82%
==========================================
Files 721 720 -1
Lines 90505 91795 +1290
==========================================
+ Hits 70479 70738 +259
- Misses 20026 21057 +1031
|
|
||
let plan = LogicalPlanBuilder::from(plan); | ||
|
||
// TODO: code duplication |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this reminder for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a pretty big block of code duplicated in a few spots that I want to clean up.
// TODO: code duplication
let result_complete = res.result_complete_response();
let (tx, rx) = tokio::sync::mpsc::channel::<eyre::Result<ExecutePlanResponse>>(1);
let this = self.clone();
tokio::spawn(async move {
let execution_fut = async {
let mut result_stream = this.run_query(plan).await?;
while let Some(result) = result_stream.next().await {
let result = result?;
let tables = result.get_tables()?;
for table in tables.as_slice() {
let response = res.arrow_batch_response(table)?;
if tx.send(Ok(response)).await.is_err() {
return Ok(());
}
}
}
Ok(())
};
if let Err(e) = execution_fut.await {
let _ = tx.send(Err(e)).await;
}
});
let stream = ReceiverStream::new(rx);
let stream = stream
.map_err(|e| {
Status::internal(
textwrap::wrap(&format!("Error in Daft server: {e}"), 120).join("\n"),
)
})
.chain(stream::once(ready(Ok(result_complete))));
Ok(Box::pin(stream))
@universalmind303 You mentioned that there is still some work remaining on the python side of things. What other things are left? |
right now there is a daft catalog and a sql catalog on the python side. we should consolidate these into a single catalog. |
Description
adds spark's
df.createOrReplaceTempView
andspark.sql
to enable sql workflows.ex:
Note for reviewers:
in order for it to be easier to interop with connect, I removed the
SQLCatalog
and refactored theSQLPlanner
to take in aDaftMetaCatalog
instead. This partially closes #3586. I think we still need to do some work on the python side of things though.